package handle

import (
	"database/sql"
	"fmt"
	"os"
	"redis-check/common"
	"strconv"
	"sync"
)

type CompareDb struct {
	DbFile        string
	ResultFile    string
	BatchCount    int
	Times         int
	db            *sql.DB
	currentDB     int32
	conflictQueue chan *common.Key
	wg            sync.WaitGroup
	final         bool
}

func NewCompareDb(dbFile string, resultFile string, batchCount int, times int) *CompareDb {
	p := &CompareDb{
		DbFile:     dbFile,
		ResultFile: resultFile,
		BatchCount: batchCount,
		Times:      times,
	}
	p.db = p.createDb(p.Times, true)
	return p
}

func (p *CompareDb) StartSave(db int32) {
	p.currentDB = db
	p.conflictQueue = make(chan *common.Key, 100)
	p.wg.Add(1)
	go func() {
		defer p.wg.Done()
		defer p.db.Close()
		p.saveValues()
	}()
}

func (p *CompareDb) ConflictQueue() chan *common.Key {
	return p.conflictQueue
}

func (p *CompareDb) FinishSave() {
	if p.conflictQueue != nil {
		close(p.conflictQueue)
		p.conflictQueue = nil
	}
	p.wg.Wait()
}

func (p *CompareDb) Destroy() {
	if p.conflictQueue != nil {
		close(p.conflictQueue)
		p.conflictQueue = nil
	}
	if p.db != nil {
		p.db.Close()
		p.db = nil
	}
}

func (p *CompareDb) saveValues() {
	if p.conflictQueue == nil {
		return
	}
	var resultfile *os.File
	if len(p.ResultFile) > 0 {
		resultfile, _ = os.OpenFile(p.ResultFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
		defer resultfile.Close()
	}

	tx, _ := p.db.Begin()
	statInsertKey, err := tx.Prepare(fmt.Sprintf("insert into key_result (key, type, conflict_type, db, source_len, target_len) values(?,?,?,?,?,?)"))
	if err != nil {
		panic(common.Logger.Error(err))
	}
	statInsertField, err := tx.Prepare(fmt.Sprintf("insert into field_result (field, conflict_type, key_id) values (?,?,?)"))
	if err != nil {
		panic(common.Logger.Error(err))
	}

	count := 0
	for oneKeyInfo := range p.conflictQueue {
		if count%1000 == 0 {
			var err error
			statInsertKey.Close()
			statInsertField.Close()
			e := tx.Commit()
			if e != nil {
				common.Logger.Error(e.Error())
			}

			tx, _ = p.db.Begin()
			statInsertKey, err = tx.Prepare(fmt.Sprintf("insert into key_result (key, type, conflict_type, db, source_len, target_len) values(?,?,?,?,?,?)"))
			if err != nil {
				panic(common.Logger.Error(err))
			}

			statInsertField, err = tx.Prepare(fmt.Sprintf("insert into field_result (field, conflict_type, key_id) values (?,?,?)"))
			if err != nil {
				panic(common.Logger.Error(err))
			}
		}
		count += 1

		result, err := statInsertKey.Exec(string(oneKeyInfo.Key), oneKeyInfo.Tp.Name, oneKeyInfo.ConflictType.String(), p.currentDB, oneKeyInfo.SourceAttr.ItemCount, oneKeyInfo.TargetAttr.ItemCount)
		if err != nil {
			panic(common.Logger.Error(err))
		}
		if len(oneKeyInfo.Field) != 0 {
			lastId, _ := result.LastInsertId()
			for i := 0; i < len(oneKeyInfo.Field); i++ {
				_, err = statInsertField.Exec(string(oneKeyInfo.Field[i].Field), oneKeyInfo.Field[i].ConflictType.String(), lastId)
				if err != nil {
					panic(common.Logger.Error(err))
				}

				if p.final {
					finalstat, err := tx.Prepare(fmt.Sprintf("insert into FINAL_RESULT (InstanceA, InstanceB, Key, Schema, InconsistentType, Extra) VALUES(?, ?, ?, ?, ?, ?)"))
					if err != nil {
						panic(common.Logger.Error(err))
					}
					// defer finalstat.Close()
					_, err = finalstat.Exec("", "", string(oneKeyInfo.Key), strconv.Itoa(int(p.currentDB)),
						oneKeyInfo.Field[i].ConflictType.String(),
						string(oneKeyInfo.Field[i].Field))
					if err != nil {
						panic(common.Logger.Error(err))
					}

					finalstat.Close()

					if len(p.ResultFile) != 0 {
						resultfile.WriteString(fmt.Sprintf("%d\t%s\t%s\t%s\n", int(p.currentDB), oneKeyInfo.Field[i].ConflictType.String(), string(oneKeyInfo.Key), string(oneKeyInfo.Field[i].Field)))
					}
				}
			}
		} else {
			if p.final {
				finalstat, err := tx.Prepare(fmt.Sprintf("insert into FINAL_RESULT (InstanceA, InstanceB, Key, Schema, InconsistentType, Extra) VALUES(?, ?, ?, ?, ?, ?)"))
				if err != nil {
					panic(common.Logger.Error(err))
				}
				// defer finalstat.Close()
				_, err = finalstat.Exec("", "", string(oneKeyInfo.Key), strconv.Itoa(int(p.currentDB)), oneKeyInfo.ConflictType.String(), "")
				if err != nil {
					panic(common.Logger.Error(err))
				}
				finalstat.Close()

				if len(p.ResultFile) != 0 {
					resultfile.WriteString(fmt.Sprintf("%d\t%s\t%s\t%s\n", int(p.currentDB), oneKeyInfo.ConflictType.String(), string(oneKeyInfo.Key), ""))
				}
			}
		}
	}
	statInsertKey.Close()
	statInsertField.Close()
	tx.Commit()
}

func (p *CompareDb) ScanLastKeys(db int32, keyQueue chan<- []*common.Key) {
	sqlDb := p.createDb(p.Times-1, false)
	defer sqlDb.Close()
	keyQuery := fmt.Sprintf("select id,key,type,conflict_type,source_len,target_len from key_result where id>? and db=%d limit %d", db, p.BatchCount)
	keyStatm, err := sqlDb.Prepare(keyQuery)
	if err != nil {
		panic(common.Logger.Error(err))
	}
	defer keyStatm.Close()

	fieldQuery := fmt.Sprintf("select field,conflict_type from field_result where key_id=?")
	fieldStatm, err := sqlDb.Prepare(fieldQuery)
	if err != nil {
		panic(common.Logger.Error(err))
	}
	defer fieldStatm.Close()

	var startId int64 = 0
	for {
		rows, err := keyStatm.Query(startId)
		if err != nil {
			panic(common.Logger.Error(err))
		}
		keyInfo := make([]*common.Key, 0, p.BatchCount)
		for rows.Next() {
			var key, keytype, conflictType string
			var id, source_len, target_len int64
			err = rows.Scan(&id, &key, &keytype, &conflictType, &source_len, &target_len)
			if err != nil {
				panic(common.Logger.Error(err))
			}
			oneKeyInfo := &common.Key{
				Key:          []byte(key),
				Tp:           common.NewKeyType(keytype),
				ConflictType: common.NewConflictType(conflictType),
				SourceAttr:   common.Attribute{ItemCount: source_len},
				TargetAttr:   common.Attribute{ItemCount: target_len},
			}
			if oneKeyInfo.Tp == common.EndKeyType {
				panic(common.Logger.Errorf("invalid type from table key_result: key=%s type=%s ", key, keytype))
			}
			if oneKeyInfo.ConflictType == common.EndConflict {
				panic(common.Logger.Errorf("invalid conflict_type from table key_result: key=%s conflict_type=%s ", key, conflictType))
			}

			if oneKeyInfo.Tp != common.StringKeyType {
				oneKeyInfo.Field = make([]common.Field, 0, 10)
				rowsField, err := fieldStatm.Query(id)
				if err != nil {
					panic(common.Logger.Error(err))
				}
				for rowsField.Next() {
					var field, conflictType string
					err = rowsField.Scan(&field, &conflictType)
					if err != nil {
						panic(common.Logger.Error(err))
					}
					oneField := common.Field{
						Field:        []byte(field),
						ConflictType: common.NewConflictType(conflictType),
					}
					if oneField.ConflictType == common.EndConflict {
						panic(common.Logger.Errorf("invalid conflict_type from table field_result: field=%s type=%s ", field, conflictType))
					}
					oneKeyInfo.Field = append(oneKeyInfo.Field, oneField)
				}
				if err := rowsField.Err(); err != nil {
					panic(common.Logger.Error(err))
				}
				rowsField.Close()
			}
			keyInfo = append(keyInfo, oneKeyInfo)
			if startId < id {
				startId = id
			}
		} // rows.Next
		if err := rows.Err(); err != nil {
			panic(common.Logger.Error(err))
		}
		rows.Close()
		// 结束
		if len(keyInfo) == 0 {
			close(keyQueue)
			break
		}
		keyQueue <- keyInfo
	} // for{}
}

func (p *CompareDb) createDb(times int, init bool) *sql.DB {
	if init {
		os.Remove(p.DbFile + "." + strconv.Itoa(times))
	}
	db, err := sql.Open("sqlite3", p.DbFile+"."+strconv.Itoa(times))
	if err != nil {
		panic(common.Logger.Critical(err))
	}
	if init {
		p.createDbTable()
	}
	return db
}

func (p *CompareDb) createDbTable() {
	conflictKeyTableSql := fmt.Sprintf(`
CREATE TABLE key_result(
   id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
   key            TEXT NOT NULL,
   type           TEXT NOT NULL,
   conflict_type  TEXT NOT NULL,
   db             INTEGER NOT NULL,
   source_len     INTEGER NOT NULL,
   target_len     INTEGER NOT NULL
);
`)
	_, err := p.db.Exec(conflictKeyTableSql)
	if err != nil {
		panic(common.Logger.Errorf("exec sql %s failed: %s", conflictKeyTableSql, err))
	}
	conflictFieldTableSql := fmt.Sprintf(`
CREATE TABLE field_result(
   id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
   field          TEXT NOT NULL,
   conflict_type  TEXT NOT NULL,
   key_id         INTEGER NOT NULL
);
`)
	_, err = p.db.Exec(conflictFieldTableSql)
	if err != nil {
		panic(common.Logger.Errorf("exec sql %s failed: %s", conflictFieldTableSql, err))
	}

	conflictResultSql := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS final_result(
	InstanceA	TEXT NOT NULL,
	InstanceB	TEXT NOT NULL,
	Key			TEXT NOT NULL,
	Schema		TEXT NOT NULL,
	InconsistentType TEXT NOT NULL,
	Extra	    TEXT NOT NULL
	);`)
	_, err = p.db.Exec(conflictResultSql)
	if err != nil {
		panic(common.Logger.Errorf("exec sql %s failed: %s", conflictResultSql, err))
	}
}
